{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Horovod Distributed Training with SageMaker TensorFlow script mode."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Horovod is a distributed training framework based on Message Passing Interfae (MPI). For information about Horovod, see [Horovod README](https://github.com/uber/horovod).\n",
    "\n",
    "You can perform distributed training with Horovod on SageMaker by using the SageMaker Tensorflow container. If MPI is enabled when you create the training job, SageMaker creates the MPI environment and executes the `mpirun` command to execute the training script. Details on how to configure mpi settings in training job are described later in this example.\n",
    "\n",
    "In this example notebook, we create a Horovod training job that uses the MNIST data set."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Set up the environment\n",
    "\n",
    "We get the `IAM` role that this notebook is running as and pass that role to the TensorFlow estimator that SageMaker uses to get data and perform training."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sagemaker\n",
    "import os\n",
    "from sagemaker.utils import sagemaker_timestamp\n",
    "from sagemaker.tensorflow import TensorFlow\n",
    "from sagemaker import get_execution_role\n",
    "\n",
    "sagemaker_session = sagemaker.Session()\n",
    "\n",
    "default_s3_bucket = sagemaker_session.default_bucket()\n",
    "sagemaker_iam_role = get_execution_role()\n",
    "\n",
    "train_script = \"mnist_hvd.py\"\n",
    "instance_count = 2"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prepare Data for training\n",
    "\n",
    "Now we download the MNIST dataset to the local `/tmp/data/` directory and then upload it to an S3 bucket. After uploading the dataset to S3, we delete the data from `/tmp/data/`. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import shutil\n",
    "\n",
    "import numpy as np\n",
    "\n",
    "import keras\n",
    "from keras.datasets import mnist\n",
    "(x_train, y_train), (x_test, y_test) = mnist.load_data()\n",
    "\n",
    "s3_train_path = \"s3://{}/mnist/train.npz\".format(default_s3_bucket)\n",
    "s3_test_path = \"s3://{}/mnist/test.npz\".format(default_s3_bucket)\n",
    "\n",
    "# Create local directory\n",
    "! mkdir -p /tmp/data/mnist_train\n",
    "! mkdir -p /tmp/data/mnist_test\n",
    "\n",
    "# Save data locally\n",
    "np.savez('/tmp/data/mnist_train/train.npz', data=x_train, labels=y_train)\n",
    "np.savez('/tmp/data/mnist_test/test.npz', data=x_test, labels=y_test)\n",
    "\n",
    "# Upload the dataset to s3\n",
    "! aws s3 cp /tmp/data/mnist_train/train.npz $s3_train_path\n",
    "! aws s3 cp /tmp/data/mnist_test/test.npz $s3_test_path\n",
    "\n",
    "print('training data at ', s3_train_path)\n",
    "print('test data at ', s3_test_path)\n",
    "! rm -rf /tmp/data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Write a script for horovod distributed training\n",
    "\n",
    "This example is based on the [Keras MNIST horovod example](https://github.com/uber/horovod/blob/master/examples/keras_mnist.py) example in the horovod github repository.\n",
    "\n",
    "To run this script we have to make following modifications:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1. Accept `--model_dir` as a command-line argument\n",
    "Modify the script to accept `model_dir` as a command-line argument that defines the directory path (i.e. `/opt/ml/model/`) where the output model is saved. Because Sagemaker deletes the training cluster when training completes, saving the model to `/opt/ml/model/` directory prevents the trained model from getting lost, because when the training job completes, SageMaker writes the data stored in `/opt/ml/model/` to an S3 bucket. \n",
    "\n",
    "This also allows the SageMaker training job to integrate with other SageMaker services, such as hosted inference endpoints or batch transform jobs. It also allows you to host the trained model outside of SageMaker.\n",
    "\n",
    "The following code adds `model_dir` as a command-line argument to the script:\n",
    "\n",
    "```\n",
    "parser = argparse.ArgumentParser()\n",
    "parser.add_argument('--model_dir', type=str)\n",
    "```\n",
    "\n",
    "More details can be found [here](https://github.com/aws/sagemaker-containers/blob/master/README.rst)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2. Load train and test data\n",
    "\n",
    "You can get local directory path where the `train` and `test` data is downloaded by reading the environment variable `SM_CHANNEL_TRAIN` and `SM_CHANNEL_TEST` respectively.\n",
    "After you get the directory path, load the data into memory.\n",
    "\n",
    "Here is the code:\n",
    "\n",
    "```\n",
    "x_train = np.load(os.path.join(os.environ['SM_CHANNEL_TRAIN'], 'train.npz'))['data']\n",
    "y_train = np.load(os.path.join(os.environ['SM_CHANNEL_TRAIN'], 'train.npz'))['labels']\n",
    "\n",
    "x_test = np.load(os.path.join(os.environ['SM_CHANNEL_TEST'], 'test.npz'))['data']\n",
    "y_test = np.load(os.path.join(os.environ['SM_CHANNEL_TEST'], 'test.npz'))['labels']\n",
    "```\n",
    "\n",
    "For a list of all environment variables set by SageMaker that are accessible inside a training script, see [SageMaker Containers](https://github.com/aws/sagemaker-containers/blob/master/README.rst)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3. Save the model only at the master node\n",
    "\n",
    "Because in Horovod the training is distributed to multiple nodes, the model should only be saved by the master node. The following code in the script does this:\n",
    "\n",
    "```\n",
    "# Horovod: Save model only on worker 0 (i.e. master)\n",
    "if hvd.rank() == 0:\n",
    "    saved_model_path = tf.contrib.saved_model.save_keras_model(model, args.model_dir)\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Training script\n",
    "\n",
    "Here is the final training script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!cat 'mnist_hvd.py'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Test locally using SageMaker Python SDK TensorFlow Estimator\n",
    "\n",
    "You can use the SageMaker Python SDK TensorFlow estimator to easily train locally and in SageMaker.\n",
    "\n",
    "This notebook shows how to use the SageMaker Python SDK to run your code in a local container before deploying to SageMaker's managed training or hosting environments. Just change your estimator's `train_instance_type` to `local` or `local_gpu`. For more information, see: https://github.com/aws/sagemaker-python-sdk#local-mode.\n",
    "\n",
    "To use this feature, you need to install docker-compose (and nvidia-docker if you are training with a GPU). Run the following script to install docker-compose or nvidia-docker-compose, and configure the notebook environment for you.\n",
    "\n",
    "**Note**: You can only run a single local notebook at a time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!/bin/bash ./setup.sh"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To train locally, set `train_instance_type` to `local`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_instance_type='local'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The MPI environment for Horovod can be configured by setting the following flags in the `mpi` field of the `distribution` dictionary that you pass to the TensorFlow estimator :\n",
    "\n",
    "* ``enabled (bool)``: If set to ``True``, the MPI setup is performed and ``mpirun`` command is executed.\n",
    "* ``processes_per_host (int) [Optional]``: Number of processes MPI should launch on each host. Note, this should not be greater than the available slots on the selected instance type. This flag should be set for the multi-cpu/gpu training.\n",
    "* ``custom_mpi_options (str) [Optional]``: Any mpirun flag(s) can be passed in this field that will be added to the mpirun command executed by SageMaker to launch distributed horovod training.\n",
    "\n",
    "For more information about the `distribution` dictionary, see the SageMaker Python SDK [README](https://github.com/aws/sagemaker-python-sdk/blob/v1.17.3/src/sagemaker/tensorflow/README.rst).\n",
    "\n",
    "First, enable MPI:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "distributions = {'mpi': {'enabled': True}}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, we create the Tensorflow estimator passing the `train_instance_type` and `distribution`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator_local = TensorFlow(entry_point=train_script,\n",
    "                       role=sagemaker_iam_role,\n",
    "                       train_instance_count=instance_count,\n",
    "                       train_instance_type=train_instance_type,\n",
    "                       script_mode=True,\n",
    "                       framework_version='1.13',\n",
    "                       py_version='py3',\n",
    "                       distributions=distributions,\n",
    "                       base_job_name='hvd-mnist-local')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Call `fit()` to start the local training "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator_local.fit({\"train\":s3_train_path, \"test\":s3_test_path})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train in SageMaker\n",
    "\n",
    "After you test the training job locally, run it on SageMaker:\n",
    "\n",
    "First, change the instance type from `local` to the valid EC2 instance type. For example, `ml.c4.xlarge`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_instance_type='ml.c4.xlarge'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can also provide your custom MPI options by passing in the `custom_mpi_options` field of `distribution` dictionary that will be added to the `mpirun` command executed by SageMaker:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "distributions = {'mpi': {'enabled': True, \"custom_mpi_options\": \"-verbose --NCCL_DEBUG=INFO\"}}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, we create the Tensorflow estimator passing the `train_instance_type` and `distribution` to launch the training job in sagemaker."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator = TensorFlow(entry_point=train_script,\n",
    "                       role=sagemaker_iam_role,\n",
    "                       train_instance_count=instance_count,\n",
    "                       train_instance_type=train_instance_type,\n",
    "                       script_mode=True,\n",
    "                       framework_version='1.13',\n",
    "                       py_version='py3',\n",
    "                       distributions=distributions,\n",
    "                       base_job_name='hvd-mnist')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Call `fit()` to start the training"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator.fit({\"train\":s3_train_path, \"test\":s3_test_path})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##  Horovod training in SageMaker using multiple CPU/GPU"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To enable mulitiple CPUs or GPUs for horovod training, set the `processes_per_host` field in the `mpi` section of the `distribution` dictionary to the desired value of processes that will be executed per instance."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "distributions = {'mpi': {'enabled': True, \"processes_per_host\": 2}}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, we create the Tensorflow estimator passing the `train_instance_type` and `distribution`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator = TensorFlow(entry_point=train_script,\n",
    "                       role=sagemaker_iam_role,\n",
    "                       train_instance_count=instance_count,\n",
    "                       train_instance_type=train_instance_type,\n",
    "                       script_mode=True,\n",
    "                       framework_version='1.13',\n",
    "                       py_version='py3',\n",
    "                       distributions=distributions,\n",
    "                       base_job_name='hvd-mnist-multi-cpu')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Call `fit()` to start the training"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator.fit({\"train\":s3_train_path, \"test\":s3_test_path})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Improving horovod training performance on SageMaker\n",
    "\n",
    "Performing Horovod training inside a VPC improves the network latency between nodes, leading to higher performance and stability of Horovod training jobs.\n",
    "\n",
    "For a detailed explanation of how to configure a VPC for SageMaker training, see [Secure Training and Inference with VPC](https://github.com/aws/sagemaker-python-sdk#secure-training-and-inference-with-vpc)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Setup VPC infrastructure\n",
    "We will setup following resources as part of VPC stack:\n",
    "* `VPC`: AWS Virtual private cloud with CIDR block.\n",
    "* `Subnets`: Two subnets with the CIDR blocks `10.0.0.0/24` and `10.0.1.0/24`\n",
    "* `Security Group`: Defining the open ingress and egress ports, such as TCP.\n",
    "* `VpcEndpoint`: S3 Vpc endpoint allowing sagemaker's vpc cluster to dosenload data from S3.\n",
    "* `Route Table`: Defining routes and is tied to subnets and VPC.\n",
    "\n",
    "Complete cloud formation template for setting up the VPC stack can be seen [here](./vpc_infra_cfn.json)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "from botocore.exceptions import ClientError\n",
    "from time import sleep\n",
    "\n",
    "def create_vpn_infra(stack_name=\"hvdvpcstack\"):\n",
    "    cfn = boto3.client(\"cloudformation\")\n",
    "\n",
    "    cfn_template = open(\"vpc_infra_cfn.json\", \"r\").read()\n",
    "    \n",
    "    try:\n",
    "        vpn_stack = cfn.create_stack(StackName=(stack_name),\n",
    "                                     TemplateBody=cfn_template)\n",
    "    except ClientError as e:\n",
    "        if e.response['Error']['Code'] == 'AlreadyExistsException':\n",
    "            print(\"Stack: {} already exists, so skipping stack creation.\".format(stack_name))\n",
    "        else:\n",
    "            print(\"Unexpected error: %s\" % e)\n",
    "            raise e\n",
    "\n",
    "    describe_stack = cfn.describe_stacks(StackName=stack_name)[\"Stacks\"][0]\n",
    "\n",
    "    while describe_stack[\"StackStatus\"] == \"CREATE_IN_PROGRESS\":\n",
    "        describe_stack = cfn.describe_stacks(StackName=stack_name)[\"Stacks\"][0]\n",
    "        sleep(0.5)\n",
    "\n",
    "    if describe_stack[\"StackStatus\"] != \"CREATE_COMPLETE\":\n",
    "        raise ValueError(\"Stack creation failed in state: {}\".format(describe_stack[\"StackStatus\"]))\n",
    "\n",
    "    print(\"Stack: {} created successfully with status: {}\".format(stack_name, describe_stack[\"StackStatus\"]))\n",
    "\n",
    "    subnets = []\n",
    "    security_groups = []\n",
    "\n",
    "    for output_field in describe_stack[\"Outputs\"]:\n",
    "\n",
    "        if output_field[\"OutputKey\"] == \"SecurityGroupId\":\n",
    "            security_groups.append(output_field[\"OutputValue\"])\n",
    "        if output_field[\"OutputKey\"] == \"Subnet1Id\" or output_field[\"OutputKey\"] == \"Subnet2Id\":\n",
    "            subnets.append(output_field[\"OutputValue\"])\n",
    "\n",
    "    return subnets, security_groups\n",
    "\n",
    "\n",
    "subnets, security_groups = create_vpn_infra()\n",
    "print(\"Subnets: {}\".format(subnets))\n",
    "print(\"Security Groups: {}\".format(security_groups))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### VPC training in SageMaker\n",
    "Now, we create the Tensorflow estimator, passing the `train_instance_type` and `distribution`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator = TensorFlow(entry_point=train_script,\n",
    "                       role=sagemaker_iam_role,\n",
    "                       train_instance_count=instance_count,\n",
    "                       train_instance_type=train_instance_type,\n",
    "                       script_mode=True,\n",
    "                       framework_version='1.13',\n",
    "                       py_version='py3',\n",
    "                       distributions=distributions,\n",
    "                       security_group_ids=['sg-0919a36a89a15222f'],\n",
    "                       subnets=['subnet-0c07198f3eb022ede', 'subnet-055b2819caae2fd1f'],\n",
    "                       base_job_name='hvd-mnist-vpc')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Call `fit()` to start the training"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator.fit({\"train\":s3_train_path, \"test\":s3_test_path})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After training is completed, you can host the saved model by using TensorFlow Serving on SageMaker. For an example that uses TensorFlow Serving, see [(https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/tensorflow_serving_container/tensorflow_serving_container.ipynb](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/tensorflow_serving_container/tensorflow_serving_container.ipynb)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Reference Links:\n",
    "* [SageMaker Container MPI Support.](https://github.com/aws/sagemaker-containers/blob/master/src/sagemaker_containers/_mpi.py)\n",
    "* [Horovod Official Documentation](https://github.com/uber/horovod)\n",
    "* [SageMaker Tensorflow script mode example.](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/tensorflow_script_mode_quickstart/tensorflow_script_mode_quickstart.ipynb)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
